const util = require('../util')
const api = require('../api')
const config = require('../config')
const timer = require('../timer')
const mongo = require('../db_helper')
const ObjectId = require('mongodb').ObjectId

const logger = util.getLogger('task.streaming')

const run = async (jobGroup, jobId) => {
  logger.debug('开始执行任务：', jobGroup)
  let result = await util.callLivy(api.postBatch, jobGroup.args)
  logger.debug('response batch ; ', result )
  let batchId = result.id
  return new Promise((suc,fail) => {
    let taskName = `${jobId}#${jobGroup.id}-${jobGroup.title}`
    timer.addTask({
      name: taskName,
      cycle: 10000,
      fun: async () => {
        try{
          let info = await util.callLivy(api.getBatch, undefined, batchId)
          logger.debug(`${jobId}#${jobGroup.id} info : `, info)
          let log = undefined
          switch(info.state) {
            case 'running': 
              timer.removeTask(taskName)  //取消定时查询任务
              log = await util.callLivy(api.getBatchLog, undefined, batchId)
              logger.debug(`streaming job state=${info.state}, ${jobId}#${jobGroup.id} log : `, log)
              //将 appcation id 写入到 mongodb
              await mongo.jobs.updateOne({
                _id: new ObjectId(jobId)
              }, {
                $set: {
                  streamingAppId: info.id
                }
              })
              suc({info, livy: log, yarn: `streaming 不输出 yarn 日志，请在 yarn web ui 中查看， applicationId=${info.id}`})    //成功回调
              break;
            case 'success':
            case 'dead':
              //成功与失败的状态都认为是运行失败，streaming任务只有 running 是成功
              timer.removeTask(taskName)  //取消定时查询任务
              log = await util.callLivy(api.getBatchLog, undefined, batchId)
              logger.debug(`streaming job state=${info.state}, ${jobId}#${jobGroup.id} log : `, log)
              //失败时获取yarn log
              if (info.appId) {
                try{
                  let logFile = `${config.upload.publishDir}/log/yarn-${jobId}-${jobGroup.id}.log`
                  let commond = `yarn logs -applicationId ${info.appId} > ${logFile}`
                  let out = await util.runExec(commond)
                  fail({info, livy: log, yarn: `${config.upload.urlPrefix}/log/yarn-${jobId}-${jobGroup.id}.log`})
                } catch (e) {
                  logger.warn('导出 yarn 日志异常：', e)
                  fail({info, livy: log, yarn: e.stack})    //成功回调
                }
              } else {
                fail({info, livy: log, yarn: 'applicationId is null'})    //成功回调
              }
              break
          }
        }catch(e){
          fail(e)   //失败回调
        }
      }
    })
  })
}

//删除任务
const remove = async (jobGroup, jobId) => {
  let taskName = `${jobId}#${jobGroup.id}-${jobGroup.title}`
  logger.info('删除子任务定时器：' + taskName)
  timer.removeTask(taskName)

  //从mongo中查询 job
  let job = await mongo.jobs.find({_id: new ObjectId(jobId)}).toArray()
  if (job && job.length && job[0].streamingAppId){
    logger.info('kill streaming appcation id = ' + job[0].streamingAppId)
    //kill appcation
    let result = await util.callLivy(api.delBatch, undefined, job[0].streamingAppId)
    logger.info('停止 streaming 任务：', result)
  }
}

// streaming job 监听，检查所有StreamingJob的运行状态，并对异常的job重复执行
timer.addTask({
  name: 'streaming-jobs-listense',
  cycle: 5 * 60 * 1000,
  fun: async () => {
    logger.debug('开始检查 streaming job')
    let jobs = (await mongo.jobs.find({status: '1'}).toArray())
    .map(x => {
      let streamingJobArray = x.group.filter(g => g.type === 'streaming')
      if (streamingJobArray && streamingJobArray.length){
        return Object.assign(x, {streaming: streamingJobArray[0]})
      }else{
        return x
      }
    })
    .filter(x => x.streaming && x.streaming.autoRun)
    logger.info(`需检查 streaming job 的数量：${jobs.length}`)
    let res = await util.callLivy(api.getBatches)
    logger.debug(`正在运行的job数量： ${res.sessions ? res.sessions.length : 0}`)
    let runningApps = res.sessions ? res.sessions.map(x => x.id) : []
    let task = require('./')
    for (let job of jobs) {
      if (job.streamingAppId) {
        if (runningApps.indexOf(job.streamingAppId) !== -1){
          logger.debug(`streaming job : ${job.title} - ${job.streaming.title} 运行正常。`)
        } else {
          logger.warn(`streaming job : ${job.title} - ${job.streaming.title} 非正常退出，需保存日志并重新运行`)
          let log = await mongo.logs.findOne({_id: new ObjectId(job.lastLog)})
          log.endTime = new Date()
          let streamingLog = log.group.filter(x => x.type === 'streaming')
          streamingLog.state = '2'
          streamingLog.e += `\n${new Date().format('yyyy-MM-dd hh:mm:ss')} 检测到job异常退出。`
          await mongo.logs.replaceOne({ _id: new ObjectId(job.lastLog) }, log)
          task.runJob(job)    //异步执行
        }
      } else {
        logger.warn(`streaming job : ${job.title} - ${job.streaming.title} 未运行，需重新运行。`)
        //运行job
        task.runJob(job)    //异步执行
      }
    }
  }
})

// 向livy提交 spark 任务，并定时查询运行状态与日志
module.exports = {
  run, remove
}